RxJava 1.x:你一定会用到的常用操作符

石朝辉2016-09-19 09:29JavaJava RxJava

转载请注明出处,点击此处open in new window 查看更多精彩内容

RxJava 操作符

本文基于 RxJava1.x 版本,阅读本文前请先了解 RxJavaopen in new window  的基本使用。

RxJava 版本已升级到 RxJava2.xopen in new window ,各个 APIopen in new window 均有不同程度的变化,具体请查看官方文档open in new window

参考文档:


1 Observableopen in new window 的创建

1.1 from()

public static <T> Observable<T> from(Iterable<? extends T> iterable);open in new window

转换集合为一个每次发射集合中一个元素的 Observableopen in new window 对象。

使用场景: 对集合(数组、List 等)进行遍历。

from()

其他 from() API:

举例:

// 1. 遍历集合
Observable<String> observable = Observable.from(new String[]{"hello", "hi"});
// 2. 使用 Future 创建 Observable,Future 表示一个异步计算的结果。
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
    @Override
    public String call() throws Exception {
        // TODO 执行异步操作并返回数据
        return "hihi";
    }
});

Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(new Action0() {
    @Override
    public void call() {
        futureTask.run();
    }
});

Observable<String> observable = Observable.from(futureTask);

1.2 just()

public static <T> Observable<T> just(final T value);open in new window

转换一个或多个 Object 为依次发射这些 Object 的 Observableopen in new window 对象。

使用场景: 转换一个或多个普通 Object 为 Observableopen in new window 对象,如转换数据库查询结果、网络查询结果等。

just()

其他 just() API:

举例:

Observable<String> observable = Observable.just("hello");

// 使用 just() 遍历几个元素
Observable<String> observable = Observable.just("hello", "hi", "...");
       
// 使用 from() 方法遍历,效果和 just() 一样。
String[] stringArrs = new String[]{"hello", "hi", "..."};
Observable<String> observable = Observable.from(stringArrs);

just() 方法可传入 1~10 个参数,也就说当元素个数小于等于 10 的时候既可以使用  just() 也可以使用 from(),否则只能用 from() 方法。

1.3 create()

public static <T> Observabl<T> create(OnSubscribe<T> f);open in new window

返回一个在被 OnSubscribe 订阅时执行特定方法的 Observableopen in new window 对象。

使用场景: 不推荐使用,可使用其他操作符替代,如使用 from()操作符完成遍历。

其他 create() API:

举例:

Observable.OnSubscribe<String> onSubscribe = new Observable.OnSubscribe< String >() {
    @Override
    public void call(Subscriber<? super String > subscriber) {
         // onNext() 方法可执行多次
        subscribe.onNext("hello");
        subscribe.onCompleted();
    }
};
Observable<Object> observable = Observable.create(onSubscribe);

此方法不常用,大多数时候都是使用 just()from() 等方法,如上面那串代码就可以写成:

Observable<Object> observable = Observable.just("hello");

1.4 interval()

public static Observable<Long> interval(long interval, TimeUnit unit);open in new window

返回一个每隔指定的时间间隔就发射一个序列号的 Observableopen in new window 对象。

使用场景: 可使用该操作符完成定时、倒计时等功能。

interval()

其他 interval() API:

举例:

// 每隔 1 s 发送一个序列号,序列号从 0 开始,每次累加 1。
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);

1.5 timer()

public static Observable<Long> timer(long delay, TimeUnit unit);open in new window

创建一个在指定延迟时间后发射一条数据(固定值:0)的 Observableopen in new window 对象。

使用场景: 可用来完成定时功能。

timer()

其他 timer() API:

举例:

// 定时 3 s
Observable<Long> observable = Observable.timer(3, TimeUnit.SECONDS);

1.6 range()

public static Observable<Integer> range(int start, int count);open in new window

创建一个发射指定范围内的连续整数的 Observableopen in new window 对象。

使用场景: 可使用该操作符完成一个 fori 的循环,如 for(int i=5;i<=7;i++) --> Observable.range(5, 3)

range()

其他 range() API:

举例:

// 依次发射 5、6、7
Observable<Integer> observable = Observable.range(5, 3);

1.7 empty()

public static <T> Observable<T> empty();open in new window

创建一个不发射任何数据就发出 onCompleted() 通知的 Observableopen in new window 对象。

empty()

举例:

// 发出一个 onCompleted() 通知
Observable<Object> observable = Observable.empty();

1.8 error()

public static <T> Observable<T> error(Throwable exception);open in new window

创建不发射任何数据就发出 onError 通知的 Observableopen in new window 对象。

使用场景: 程序中捕获异常后,可使用该操作符把捕获的异常传递到后面的逻辑中处理。

error()

举例:

// 发出一个 onError() 通知
Observable<Object> observable = Observable.error(new Throwable("message"));

1.9 never()

public static <T> Observable<T> never();open in new window

创建一个不发射任何数据和通知的 Observableopen in new window 对象。

never()

举例:

Observable<Object> observable = Observable.never();

1.10 defer()

public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);open in new window

在订阅的时候才会创建 Observable 对象;每一次订阅都创建一个新的 Observableopen in new window 对象。

使用场景: 可以使用该操作符封装需要被多次执行的函数。

defer()

举例:

Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
    @Override
    public Observable<String> call() {
        return Observable.just("string");
    }
});

2 重做

2.1 repeat()

public final Observable<T> repeat();open in new window

使Observable 对象在发出 onNext() 通知之后重复发射数据。重做结束才会发出 onComplete() 通知,若重做过程中出现异常则会中断并发出 onError() 通知。

使用场景: 可使用该操作符指定一次任务执行完成后立即重复执行上一次的任务,如发送多次网络请求等。

repeat()

其他 repeat() API:

举例:

Observable<String> observable = Observable.just("string");
// 无限重复执行
observable.repeat();
// 重复执行 5 次
observable.repeat(5);

2.2 repeatWhen()

public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler)open in new window

使Observable 对象在发出 onNext() 通知之后有条件的重复发射数据。重做结束才会发出 onCompleted() 通知,若重做过程中出现异常则会中断并发出 onError() 通知。

使用场景: 可使用该操作符指定满足一定条件时重复执行一个任务,如发送多次网络请求等。

repeatWhen()

其他 repeatWhen() API:

举例:

observable.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
    @Override
    public Observable<?> call(Observable<? extends Void> observable) {
        // 重复 3 次, 每次间隔 1 s
        return observable.zipWith(Observable.range(1, 3), new Func2<Void, Integer, Integer>() {
            @Override
            public Integer call(Void aVoid, Integer integer) {
                return integer;
             }
        }).flatMap(integer -> Observable.timer(1, TimeUnit.SECONDS));
    }
});

3 重试

3.1 retry()

public final Observable<T> retry();open in new window

在执行 Observableopen in new window对象的序列出现异常时,不直接发出 onError() 通知,而是重新订阅该 Observableopen in new window对象,直到重做过程中未出现异常,则会发出 onNext()onCompleted() 通知;若重做过程中也出现异常,则会继续重试,直到达到重试次数上限,超出次数后发出最新的 onError() 通知。

使用场景: 网络等请求异常出错后,可重新发起请求。

retry()

其他 retry() API:

举例:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        System.out.println(".......");
        int a = 1 / 0;
        subscriber.onNext(a);
        subscriber.onCompleted();
    }
});
// 无限次的重试
observable.retry();
// 重试 3 次
observable.retry(3);

// 使用谓语函数决定是否重试
observable.retry(new Func2<Integer, Throwable, Boolean>() {
    @Override
    public Boolean call(Integer integer, Throwable throwable) {
        // 参数 integer 是订阅的次数; 参数 throwable 是抛出的异常
        // 返回值为 true 表示重试, 返回值为 false 表示不重试
        return false;
    }
});

3.2 retryWhen()

public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler);open in new window

有条件的执行重试。

使用场景: 网络等请求异常出错后,若满足一定条件,则重新发起请求。

retryWhen()

其他 retryWhen() API:

举例:

// 重试 3 次,每次间隔 1 s
observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Object>() {
            @Override
            public Object call(Throwable throwable, Integer integer) {
                return integer;
            }
        }).flatMap(new Func1<Object, Observable<?>>() {
            @Override
            public Observable<?> call(Object o) {
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        });
    }
});

4 变换

4.1 map()

public final <R> Observable<R> map(Func1<? super T, ? extends R> func);open in new window

把源 Observableopen in new window 发射的元素应用于指定的函数,并发送该函数的结果。

使用场景: 将从网络获取的数据(NetData 对象)转换为数据库相关对象(DBData对象)并使用 Observableopen in new window 发送。

map()

举例:

Observable.just(2)
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return String.valueOf(String.format("原始数据的两倍为: %s", integer * 2));
            }
        });

4.2 flatMap()

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func);open in new window

转换源 Observableopen in new window 对象为另一个 Observableopen in new window 对象。

使用场景: 从网络获取数据并使用 obsA 对象发射,flatMap() 操作符中可将数据存进数据库并返回一个新的对象 obsB。

flatMap()

其他 flatMap() API:

举例:

Observable.just(2)
        .flatMap(new Func1<Integer, Observable<Long>>() {
            @Override
            public Observable<Long> call(Integer integer) {
                // 转换为一个定时 integer 秒的 Observable 对象
                return Observable.timer(integer, TimeUnit.SECONDS);
            }
        });

5 过滤

5.1 filter()

public final Observable<T> filter(Func1<? super T, Boolean> predicate);open in new window

只发射满足指定谓词的元素。

使用场景: 可使用 filter 代替 if 语句。

filter()

举例:

Observable.just(-1, -2, 0, 1, 2)
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer > 0;
            }
        });

5.2 first()

public final Observable<T> first();open in new window

返回一个仅仅发射源 Observableopen in new window 发射的第一个[满足指定谓词的]元素的 Observableopen in new window,如果源 Observableopen in new window 为空,则会抛出一个 NoSuchElementException

使用场景: 顺序发出多条数据,只接收第一条。

first()

其他 first() API:

举例:

// 发射第一个元素
Observable.just(-1, -2, 0, 1, 2).first();

// 发射满足条件的第一个元素
Observable.just(-1, -2, 0, 1, 2)
        .first(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer > 0;
            }
        });

// 会抛出 NoSuchElementException 异常
Observable.empty().first();

5.3 last()

public final Observable<T> last();open in new window

返回一个仅仅发射源 Observableopen in new window 发射的倒数第一个[满足指定谓词的]元素的 Observableopen in new window,如果源 Observableopen in new window 为空,则会抛出一个 NoSuchElementException

使用场景: 顺序发出多条数据,只接收最后一条。

last()

其他 last() API:

举例:

// 发射倒数第一个元素
Observable.just(-1, -2, 0, 1, 2).first();

// 发射满足条件的倒数第一个元素
Observable.just(-1, -2, 0, 1, 2)
        .first(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer < 0;
            }
        });

// 会抛出 NoSuchElementException 异常
Observable.empty().last();

5.4 skip()

public final Observable<T> skip(int count);open in new window

跳过前面指定数量或指定时间内的元素,只发射后面的元素。

skip()

其他 skip() API:

举例:

Observable.just(-1, -2, 0, 1, 2)
        .skip(2) // 跳过前两条数据

5.5 skipLast()

public final Observable<T> skipLast(int count);open in new window

跳过前面指定数量或指定时间内的元素,只发射后面的元素。指定时间时会延迟源 Observableopen in new window 发射的任何数据。

skipLast()

其他 skipLast() API:

举例:

Observable.just(-1, -2, 0, 1, 2)
        .skipLast(2) // 跳过后两条数据

5.6 take()

public final Observable<T> take(final int count);open in new window

只发射前面指定数量或指定时间内的元素。

take()

其他 take() API:

举例:

Observable.just(-1, -2, 0, 1, 2).take(3); // 只发射前三条数据

5.7 takeLast()

public final Observable<T> takeLast(final int count);open in new window

只发射后面指定数量或指定时间内的元素。指定时间时会延迟源 Observableopen in new window 发射的任何数据。

takeLast()

其他 takeLast() API:

举例:

Observable.just(-1, -2, 0, 1, 2).takeLast(3); // 只发射后三条数据

5.8 sample()

public final Observable<T> sample(long period, TimeUnit unit);open in new window

定期发射 Observableopen in new window 发射的最后一条数据。

sample()

其他 sample() API:

举例:

Observable.interval(300, TimeUnit.MILLISECONDS)
        .sample(2, TimeUnit.SECONDS)

5.9 elementAt()

public final Observable<T> elementAt(int index);open in new window

只发射指定索引的元素。 使用场景: 按索引去集合中的元素等。

elementAt()

举例:

Observable.just(-1, -2, 0, 1, 2).elementAt(2); // 发射索引为 2 的数据

5.10 elementAtOrDefault()

public final Observable<T> elementAtOrDefault(int index, T defaultValue);open in new window

只发射指定索引的元素,若该索引对应的元素不存在,则发射默认值。

elementAtOrDefault()

举例:

Observable.just(-1, -2, 0, 1, 2).elementAtOrDefault(9, -5); // 发射索引为 9的数据,若不存在,则发射 -5

5.11 ignoreElements()

public final Observable<T> ignoreElements();open in new window

不发射任何数据,直接发出 onCompleted() 通知。

ignoreElements()

举例:

Observable.just(-1, -2, 0, 1, 2).ignoreElements()

5.12 distinct()

public final Observable<T> distinct();open in new window

过滤重复的元素,过滤规则是:只允许还没有发射过的元素通过。

distinct()

其他 distinct() API:

举例:

// 直接过滤
Observable.just(-1, -2, 0, 1, 2, 1).distinct();

// 通过生成的 key 值过滤
Observable.just(-1, -2, 0, 1, 2, 1).distinct(new Func1<Integer, Integer>() {
    @Override
    public Integer call(Integer integer) {
        // 随机生成 key
        return integer * (int)(Math.random() * 10);
    }
});

5.13 debounce()

public final Observable<T> debounce(long timeout, TimeUnit unit)open in new window

源 Observableopen in new window 每产生结果后,如果在规定的间隔时间内没有产生新的结果,则发射这个结果,否则会忽略这个结果。该操作符会过滤掉发射速率过快的数据项。

debounce()

其他 debounce() API:

举例:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        try {
            //产生结果的间隔时间分别为100、200、300...900毫秒
            for (int i = 1; i < 10; i++) {
                subscriber.onNext(i);
                Thread.sleep(i * 100);
            }
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }
});
observable.debounce(400, TimeUnit.MILLISECONDS)  // 超时时间为400毫秒

该例子产生结果为:依次打印5、6、7、8。

附:功能实现

延时遍历

// 遍历
Observable<Integer> traverseObservable = Observable.just(3, 4, 5, 6);
// 计时
Observable<Long> intervalObservable = Observable.interval(1, TimeUnit.SECONDS);
        
Func2<Long, Integer, Integer> func2 = new Func2<Long, Integer, Integer>() {
    @Override
    public Integer call(Long aLong, Integer integer) {
        return integer;
    }
};

intervalObservable.zipWith(traverseObservable, func2)
        .toBlocking()
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

           @Override
           public void onError(Throwable e) {
               e.printStackTrace();
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }
        });

倒计时

int startTime = 10;

Observable.interval(0, 1, TimeUnit.SECONDS)
        .take(startTime + 1) // 接收 startTime + 1 次
        .map(new Func1<Long, Long>() {
            @Override
            public Long call(Long time) {
                // 1 2 3...转换为...3 2 1
                return startTime - time;
            }
        })
        .toBlocking()
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("倒计时结束");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("倒计时出现异常");
                e.printStackTrace();
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println(String.format("倒计时: %s s", aLong));
            }
       });
最后更新于 2024-02-04 02:20:03